[
https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787681#comment-17787681
]
A. Sophie Blee-Goldman commented on KAFKA-15843:
Hey [~lianetm] I worked on the old ConsumerRebalanceListener a lot and can
provide some context here. The reason #onPartitionsAssigned is still called on
an empty set of partitions is largely historical, and the tl;dr is that it's
probably ok to change this behavior in the new consumer if it won't impact the
older one.
For some background, in the old days of eager rebalancing (which is still the
default protocol in the original consumer client), we would always invoke both
#onPartitionsRevoked and #onPartitionsAssigned at the start and end of a
rebalance, respectively. And since all partitions are revoked and re-assigned
with eager rebalancing, there (usually) was a non-empty set of partitions
passed into each of these.
Then came incremental cooperative rebalancing: we no longer revoked &
reassigned all the partitions and instead acted only on the incremental change
in partition assignment. So #onPartitionsRevoked only gets the subset of
partitions that are being migrated to a different consumer, and
#onPartitionsAssigned only gets newly-added partitions. Also, with the
cooperative protocol, #onPartitionsRevoked would be invoked at the _end_ of a
rebalance, rather than at the beginning.
However we still had to maintain compatibility across the two protocols for
those implementing ConsumerRebalanceListener. And it was common to use the
rebalance listener not just to listen in on the partition assignment, but to
notify about the start and end of a rebalance. Therefore we decided to
guarantee that #onPartitionsAssigned would still be invoked at the end of every
rebalance, in case of users relying on this callback to detect the end of a
rebalance. However, since #onPartitionsRevoked is no longer even invoked at the
start of a cooperative rebalance, it can't be used to detect the start of one
anymore and there was no reason to continue invoking it on every rebalance
unless there were actually some partitions that were revoked. You'll notice
that if the eager protocol is still enabled, the #onPartitionsRevoked callback
actually is still invoked regardless of whether there's a non-empty set of
partitions passed into it or not.
#onPartitionsLost is a bit of a special case, since (a) it was only added
around the time cooperative rebalancing was implemented, as there was no old
behavior for us to maintain compatibility with, and (b) it doesn't happen
during a regular rebalance but instead only to notify the rebalance listener of
a special case, ie that it has lost ownership of these partitions (but for that
exact reason cannot commit offsets for them, as would normally occur in an
#onPartitionsRevoked). If there aren't any lost partitions, there's no reason
to invoke this callback (and it would be misleading to do so)
My understanding is that there is no "eager" or "cooperative" protocol in the
new consumer, it's an entirely new protocol, so I would assume you're not
obligated to maintain compatibility for existing ConsumerRebalanceListener
implementations. In that case, it probably does not make sense to guarantee
that #onPartitionsAssigned is invoked on every rebalance regardless, even if no
new partitions are added. I'm not super familiar with the KIP-848
implementation details, but I would assume that users can still use the
ConsumerPartitionAssignor callbacks to effectively detect the start and end of
a rebalance (via #subscriptionUserdata and #onAssignment)
Of course, if you intend to change the behavior in a way that would affect the
old consumer as well, then you'll need to give Kafka Streams time to adopt a
new approach since we currently still rely on #onPartitionsAssigned to notify
us when a rebalance ends. I'm pretty sure we don't plan on using the new
consumer right away though, since we'll need to make a number of changes like
this one before we can do so.
> Review consumer onPartitionsAssigned called with empty partitions
> -
>
> Key: KAFKA-15843
> URL: https://issues.apache.org/jira/browse/KAFKA-15843
> Project: Kafka
> Issue Type: Sub-task
> Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
> Labels: kip-848, kip-848-client-support, kip-848-e2e,
> kip-848-preview
>
> Legacy coordinator triggers onPartitionsAssigned with empty assignment (which
> is not the case when triggering onPartitionsRevoked or Lost). This is the
> behaviour of the legacy coordinator, and the new consumer implementation
> maintains the same principle. We should review this to fully understand if it
> is really needed to call