[jira] [Commented] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions

2023-12-07 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794436#comment-17794436
 ] 

Lianet Magrans commented on KAFKA-15843:


Hey [~ableegoldman] , thanks a lot for such great context, really helpful. Your 
comments align with what we thought then, that with the new consumer (only the 
new one btw, that supports only the new protocol), we could consider making it 
consistent and invoke all callbacks only when partitions involved. We do need 
to consider the needs for a client to know when a rebalance started/ended 
though, with the KIP `ConsumerPartitionAssignor` will be deprecated. I'll 
address this in sync with KStreams to better understand how they use this and 
then evaluate how to move in this direction with the new consumer. Thanks!

> Review consumer onPartitionsAssigned called with empty partitions
> -
>
> Key: KAFKA-15843
> URL: https://issues.apache.org/jira/browse/KAFKA-15843
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> Legacy coordinator triggers onPartitionsAssigned with empty assignment (which 
> is not the case when triggering onPartitionsRevoked or Lost). This is the 
> behaviour of the legacy coordinator, and the new consumer implementation 
> maintains the same principle. We should review this to fully understand if it 
> is really needed to call onPartitionsAssigned with empty assignment (or if it 
> should behave consistently with the onRevoke/Lost)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions

2023-11-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787681#comment-17787681
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15843:


Hey [~lianetm] I worked on the old ConsumerRebalanceListener a lot and can 
provide some context here. The reason #onPartitionsAssigned is still called on 
an empty set of partitions is largely historical, and the tl;dr is that it's 
probably ok to change this behavior in the new consumer if it won't impact the 
older one. 

For some background, in the old days of eager rebalancing (which is still the 
default protocol in the original consumer client), we would always invoke both 
#onPartitionsRevoked and #onPartitionsAssigned at the start and end of a 
rebalance, respectively. And since all partitions are revoked and re-assigned 
with eager rebalancing, there (usually) was a non-empty set of partitions 
passed into each of these.

Then came incremental cooperative rebalancing: we no longer revoked & 
reassigned all the partitions and instead acted only on the incremental change 
in partition assignment. So #onPartitionsRevoked only gets the subset of 
partitions that are being migrated to a different consumer, and 
#onPartitionsAssigned only gets newly-added partitions. Also, with the 
cooperative protocol, #onPartitionsRevoked would be invoked at the _end_ of a 
rebalance, rather than at the beginning.

However we still had to maintain compatibility across the two protocols for 
those implementing ConsumerRebalanceListener. And it was common to use the 
rebalance listener not just to listen in on the partition assignment, but to 
notify about the start and end of a rebalance. Therefore we decided to 
guarantee that #onPartitionsAssigned would still be invoked at the end of every 
rebalance, in case of users relying on this callback to detect the end of a 
rebalance. However, since #onPartitionsRevoked is no longer even invoked at the 
start of a cooperative rebalance, it can't be used to detect the start of one 
anymore and there was no reason to continue invoking it on every rebalance 
unless there were actually some partitions that were revoked. You'll notice 
that if the eager protocol is still enabled, the #onPartitionsRevoked callback  
actually is still invoked regardless of whether there's a non-empty set of 
partitions passed into it or not.

#onPartitionsLost is a bit of a special case, since (a) it was only added 
around the time cooperative rebalancing was implemented, as there was no old 
behavior for us to maintain compatibility with, and (b) it doesn't happen 
during a regular rebalance but instead only to notify the rebalance listener of 
a special case, ie that it has lost ownership of these partitions (but for that 
exact reason cannot commit offsets for them, as would normally occur in an 
#onPartitionsRevoked). If there aren't any lost partitions, there's no reason 
to invoke this callback (and it would be misleading to do so)

My understanding is that there is no "eager" or "cooperative" protocol in the 
new consumer, it's an entirely new protocol, so I would assume you're not 
obligated to maintain compatibility for existing ConsumerRebalanceListener 
implementations. In that case, it probably does not make sense to guarantee 
that #onPartitionsAssigned is invoked on every rebalance regardless, even if no 
new partitions are added. I'm not super familiar with the KIP-848 
implementation details, but I would assume that users can still use the 
ConsumerPartitionAssignor callbacks to effectively detect the start and end of 
a rebalance (via #subscriptionUserdata and #onAssignment)

Of course, if you intend to change the behavior in a way that would affect the 
old consumer as well, then you'll need to give Kafka Streams time to adopt a 
new approach since we currently still rely on #onPartitionsAssigned to notify 
us when a rebalance ends. I'm pretty sure we don't plan on using the new 
consumer right away though, since we'll need to make a number of changes like 
this one before we can do so.

> Review consumer onPartitionsAssigned called with empty partitions
> -
>
> Key: KAFKA-15843
> URL: https://issues.apache.org/jira/browse/KAFKA-15843
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> Legacy coordinator triggers onPartitionsAssigned with empty assignment (which 
> is not the case when triggering onPartitionsRevoked or Lost). This is the 
> behaviour of the legacy coordinator, and the new consumer implementation 
> maintains the same principle. We should review this to fully understand if it 
> is really needed to call