[ 
https://issues.apache.org/jira/browse/KAFKA-16312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16312:
------------------------------
    Description: 
There is a difference between the {{LegacyKafkaConsumer}} and 
{{AsyncKafkaConsumer}} respecting when the 
{{ConsumerRebalanceListener.onPartitionsAssigned()}} method is invoked.

For example, with {{onPartitionsAssigned()}}:

* {{LegacyKafkaConsumer}}: the listener method is invoked when the consumer 
joins the group, even if that consumer was not assigned any partitions. In this 
case it's passed an empty list.
* {{AsyncKafkaConsumer}}: the listener method is only invoked after the 
consumer joins the group iff it has assigned partitions

This difference is affecting the system tests. The system tests use a Java 
class named {{VerifiableConsumer}} which uses a {{ConsumerRebalanceListener}} 
that logs when the callbacks are invoked. The system tests then read from that 
log to determine when the callbacks are invoked. This coordination is used by 
the system tests to determine the lifecycle and status of the consumers.

The system tests rely heavily on the listener behavior of the 
{{LegacyKafkaConsumer}}. It invokes the {{onPartitionsAssigned()}} method when 
the consumer joins the group, and the system tests use that to determine when 
the consumer is actively a member of the group. This validation of membership 
is used as an assertion throughout the consumer-related tests.

In the system test I'm executing from {{consumer_test.py}}, there's a test that 
creates three consumers to read from a single topic with a single partition. 
It's a bit of an oddball test, but it demonstrates the issue.

Here are the logs pulled from the test run when executed using the 
{{LegacyKafkaConsumer}}:

Node 1:

{code:java}
[2024-02-15 00:43:52,400] INFO Adding newly assigned partitions:  
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}

Node 2:

{code:java}
[2024-02-15 00:43:52,401] INFO Adding newly assigned partitions: test_topic-0 
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}

Node 3:

{code:java}
[2024-02-15 00:43:52,399] INFO Adding newly assigned partitions:  
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}

Here are the logs when executing the same test using the {{AsyncKafkaConsumer}}:

Node 1:

{code:java}
[2024-02-15 01:15:46,576] INFO Adding newly assigned partitions: test_topic-0 
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}

Node 2:

{code:java}n/a{code}

Node 3:

{code:java}n/a{code}

As a result of this change, the existing system tests do not work with the new 
consumer. However, even more importantly, this change in behavior may adversely 
affect existing users.


  was:
There is a difference between the {{LegacyKafkaConsumer}} and 
{{AsyncKafkaConsumer}} respecting when the 
{{ConsumerRebalanceListener.onPartitionsAssigned()}} method is invoked.

 
For example, with {{onPartitionsAssigned()}}:

* {{LegacyKafkaConsumer}}: the listener method is invoked when the consumer 
joins the group, even if that consumer was not assigned any partitions. In this 
case it's passed an empty list.
* {{AsyncKafkaConsumer}}: the listener method is only invoked after the 
consumer joins the group iff it has assigned partitions

 
This difference is affecting the system tests. The system tests use a Java 
class named {{VerifiableConsumer}} which uses a {{ConsumerRebalanceListener}} 
that logs when the callbacks are invoked. The system tests then read from that 
log to determine when the callbacks are invoked. This coordination is used by 
the system tests to determine the lifecycle and status of the consumers.
 
The system tests rely heavily on the listener behavior of the 
{{LegacyKafkaConsumer}}. It invokes the {{onPartitionsAssigned()}} method when 
the consumer joins the group, and the system tests use that to determine when 
the consumer is actively a member of the group. This validation of membership 
is used as an assertion throughout the consumer-related tests.
 
In the system test I'm executing from {{consumer_test.py}}, there's a test that 
creates three consumers to read from a single topic with a single partition. 
It's a bit of an oddball test, but it demonstrates the issue.
 
Here are the logs pulled from the test run when executed using the 
{{LegacyKafkaConsumer}}:
 
Node 1:
 
{code:java}
[2024-02-15 00:43:52,400] INFO Adding newly assigned partitions:  
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
 
Node 2:
 
{code:java}
[2024-02-15 00:43:52,401] INFO Adding newly assigned partitions: test_topic-0 
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
 
Node 3:
 
{code:java}
[2024-02-15 00:43:52,399] INFO Adding newly assigned partitions:  
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}

Here are the logs when executing the same test using the {{AsyncKafkaConsumer}}:

Node 1:

{code:java}
[2024-02-15 01:15:46,576] INFO Adding newly assigned partitions: test_topic-0 
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}

Node 2:

{code:java}n/a{code}

Node 3:

{code:java}n/a{code}

As a result of this change, the existing system tests do not work with the new 
consumer. However, even more importantly, this change in behavior may adversely 
affect existing users.


> ConsumerRebalanceListener.onPartitionsAssigned should be called after 
> joining, even if empty
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16312
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16312
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 3.7.0
>            Reporter: Kirk True
>            Assignee: Lianet Magrans
>            Priority: Major
>              Labels: kip-848-client-support
>             Fix For: 3.8.0
>
>
> There is a difference between the {{LegacyKafkaConsumer}} and 
> {{AsyncKafkaConsumer}} respecting when the 
> {{ConsumerRebalanceListener.onPartitionsAssigned()}} method is invoked.
> For example, with {{onPartitionsAssigned()}}:
> * {{LegacyKafkaConsumer}}: the listener method is invoked when the consumer 
> joins the group, even if that consumer was not assigned any partitions. In 
> this case it's passed an empty list.
> * {{AsyncKafkaConsumer}}: the listener method is only invoked after the 
> consumer joins the group iff it has assigned partitions
> This difference is affecting the system tests. The system tests use a Java 
> class named {{VerifiableConsumer}} which uses a {{ConsumerRebalanceListener}} 
> that logs when the callbacks are invoked. The system tests then read from 
> that log to determine when the callbacks are invoked. This coordination is 
> used by the system tests to determine the lifecycle and status of the 
> consumers.
> The system tests rely heavily on the listener behavior of the 
> {{LegacyKafkaConsumer}}. It invokes the {{onPartitionsAssigned()}} method 
> when the consumer joins the group, and the system tests use that to determine 
> when the consumer is actively a member of the group. This validation of 
> membership is used as an assertion throughout the consumer-related tests.
> In the system test I'm executing from {{consumer_test.py}}, there's a test 
> that creates three consumers to read from a single topic with a single 
> partition. It's a bit of an oddball test, but it demonstrates the issue.
> Here are the logs pulled from the test run when executed using the 
> {{LegacyKafkaConsumer}}:
> Node 1:
> {code:java}
> [2024-02-15 00:43:52,400] INFO Adding newly assigned partitions:  
> (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
> Node 2:
> {code:java}
> [2024-02-15 00:43:52,401] INFO Adding newly assigned partitions: test_topic-0 
> (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
> Node 3:
> {code:java}
> [2024-02-15 00:43:52,399] INFO Adding newly assigned partitions:  
> (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
> Here are the logs when executing the same test using the 
> {{AsyncKafkaConsumer}}:
> Node 1:
> {code:java}
> [2024-02-15 01:15:46,576] INFO Adding newly assigned partitions: test_topic-0 
> (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
> Node 2:
> {code:java}n/a{code}
> Node 3:
> {code:java}n/a{code}
> As a result of this change, the existing system tests do not work with the 
> new consumer. However, even more importantly, this change in behavior may 
> adversely affect existing users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to