Bojan Blagojevic created KAFKA-14639:
----------------------------------------

             Summary: Kafka CooperativeStickyAssignor revokes/assigns partition 
in one rebalance cycle
                 Key: KAFKA-14639
                 URL: https://issues.apache.org/jira/browse/KAFKA-14639
             Project: Kafka
          Issue Type: Bug
          Components: clients, consumer
    Affects Versions: 3.2.1
            Reporter: Bojan Blagojevic


I have an application that runs 6 consumers in parallel. I am getting some 
unexpected results when I use {{{}CooperativeStickyAssignor{}}}. If I 
understand the mechanism correctly, if the consumer looses partition in one 
rebalance cycle, the partition should be assigned in the next rebalance cycle.

This assumption is based on the 
[RebalanceProtocol|https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html]
 documentation and few blog posts that describe the protocol, like [this 
one|https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/]
 on Confluent blog.
{quote}The assignor should not reassign any owned partitions immediately, but 
instead may indicate consumers the need for partition revocation so that the 
revoked partitions can be reassigned to other consumers in the next rebalance 
event. This is designed for sticky assignment logic which attempts to minimize 
partition reassignment with cooperative adjustments.
{quote}
{quote}Any member that revoked partitions then rejoins the group, triggering a 
second rebalance so that its revoked partitions can be assigned. Until then, 
these partitions are unowned and unassigned.
{quote}
These are the logs from the application that uses 
{{{}protocol='cooperative-sticky'{}}}. In the same rebalance cycle 
({{{}generationId=640{}}}) {{partition 74}} moves from {{consumer-3}} to 
{{{}consumer-4{}}}. I omitted the lines that are logged by the other 4 
consumers.

Mind that the log is in reverse(bottom to top)
2022-12-14 11:18:24   1 --- [consumer-3] x.y.z.MyRebalanceHandler1   : New 
partition assignment: partition-59, seek to min common offset: 85120524
2022-12-14 11:18:24   1 --- [consumer-3] x.y.z.MyRebalanceHandler2   : 
Partitions [partition-59] assigned successfully
2022-12-14 11:18:24   1 --- [consumer-3] x.y.z.MyRebalanceHandler1   : 
Partitions assigned: [partition-59]
2022-12-14 11:18:24   1 --- [consumer-3] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=partition-3-my-client-id-my-group-id, groupId=my-group-id] Adding 
newly assigned partitions: partition-59
2022-12-14 11:18:24   1 --- [consumer-3] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=partition-3-my-client-id-my-group-id, groupId=my-group-id] Notifying 
assignor about the new Assignment(partitions=[partition-59])
2022-12-14 11:18:24   1 --- [consumer-3] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=partition-3-my-client-id-my-group-id, groupId=my-group-id] Request 
joining group due to: need to revoke partitions [partition-26, partition-74] as 
indicated by the current assignment and re-join
2022-12-14 11:18:24   1 --- [consumer-3] x.y.z.MyRebalanceHandler2   : 
Partitions [partition-26, partition-74] revoked successfully
2022-12-14 11:18:24   1 --- [consumer-3] x.y.z.MyRebalanceHandler1   : Finished 
removing partition data
2022-12-14 11:18:24   1 --- [consumer-4] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] 
(Re-)joining group
2022-12-14 11:18:24   1 --- [consumer-4] x.y.z.MyRebalanceHandler1   : New 
partition assignment: partition-74, seek to min common offset: 107317730
2022-12-14 11:18:24   1 --- [consumer-4] x.y.z.MyRebalanceHandler2   : 
Partitions [partition-74] assigned successfully
2022-12-14 11:18:24   1 --- [consumer-4] x.y.z.MyRebalanceHandler1   : 
Partitions assigned: [partition-74]
2022-12-14 11:18:24   1 --- [consumer-4] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] Adding 
newly assigned partitions: partition-74
2022-12-14 11:18:24   1 --- [consumer-4] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] Notifying 
assignor about the new Assignment(partitions=[partition-74])
2022-12-14 11:18:24   1 --- [consumer-4] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] Request 
joining group due to: need to revoke partitions [partition-57] as indicated by 
the current assignment and re-join
2022-12-14 11:18:24   1 --- [consumer-4] x.y.z.MyRebalanceHandler2   : 
Partitions [partition-57] revoked successfully
2022-12-14 11:18:24   1 --- [consumer-4] x.y.z.MyRebalanceHandler1   : Finished 
removing partition data
2022-12-14 11:18:22   1 --- [consumer-3] x.y.z.MyRebalanceHandler1   : 
Partitions revoked: [partition-26, partition-74]
2022-12-14 11:18:22   1 --- [consumer-3] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=partition-3-my-client-id-my-group-id, groupId=my-group-id] Revoke 
previously assigned partitions partition-26, partition-74
2022-12-14 11:18:22   1 --- [consumer-3] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=partition-3-my-client-id-my-group-id, groupId=my-group-id] Updating 
assignment with\n\tAssigned partitions:                       
[partition-59]\n\tCurrent owned partitions:                  [partition-26, 
partition-74]\n\tAdded partitions (assigned - owned):       
[partition-59]\n\tRevoked partitions (owned - assigned):     [partition-26, 
partition-74]
2022-12-14 11:18:22   1 --- [consumer-3] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=partition-3-my-client-id-my-group-id, groupId=my-group-id] 
Successfully synced group in generation Generation\{generationId=640, 
memberId='partition-3-my-client-id-my-group-id-c31afd19-3f22-43cb-ad07-9088aa98d3af',
 protocol='cooperative-sticky'}
2022-12-14 11:18:22   1 --- [consumer-3] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=partition-3-my-client-id-my-group-id, groupId=my-group-id] 
Successfully joined group with generation Generation\{generationId=640, 
memberId='partition-3-my-client-id-my-group-id-c31afd19-3f22-43cb-ad07-9088aa98d3af',
 protocol='cooperative-sticky'}
2022-12-14 11:18:22   1 --- [consumer-4] x.y.z.MyRebalanceHandler1   : 
Partitions revoked: [partition-57]
2022-12-14 11:18:22   1 --- [consumer-4] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] Revoke 
previously assigned partitions partition-57
2022-12-14 11:18:22   1 --- [consumer-4] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] Updating 
assignment with\n\tAssigned partitions:                       
[partition-74]\n\tCurrent owned partitions:                  
[partition-57]\n\tAdded partitions (assigned - owned):       
[partition-74]\n\tRevoked partitions (owned - assigned):     [partition-57]
2022-12-14 11:18:21   1 --- [id-1] o.a.k.c.c.internals.ConsumerCoordinator  : 
[Consumer clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] 
Successfully synced group in generation Generation\{generationId=640, 
memberId='partition-4-my-client-id-my-group-id-ae2af665-edc9-4a8e-b658-98372d142477',
 protocol='cooperative-sticky'}
2022-12-14 11:18:21   1 --- [consumer-4] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] 
Successfully joined group with generation Generation\{generationId=640, 
memberId='partition-4-my-client-id-my-group-id-ae2af665-edc9-4a8e-b658-98372d142477',
 protocol='cooperative-sticky'}
{{}}

Is this expected?

Kafka client version is 3.2.1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to