Jonathan Haapala created KAFKA-15998:
----------------------------------------

             Summary: EAGER rebalance onPartitionsAssigned() called with no 
previous onPartitionsLost() nor onPartitionsRevoked()
                 Key: KAFKA-15998
                 URL: https://issues.apache.org/jira/browse/KAFKA-15998
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 3.4.0
            Reporter: Jonathan Haapala


I ran into a case where {{onPartitionsAssigned()}} was called without first 
calling {{onPartitionsRevoked()}} and there is no indication that 
{{onPartitionsLost()}} was called or had any reason to be called. We are using 
the *EAGER* rebalance protocol and the *StickyAssignor* on kafka 3.4.0.

Our services rely on the API contract that {{{}onPartitionsRevoked(){}}}:
{quote}In eager rebalancing, it will always be called at the start of a 
rebalance and after the consumer stops fetching data.{quote}
We internally keep track of partition states with a state machine, and rely on 
these APIs to assert what expected states we are in. So when a partition is 
Revoked and then re-Assigned, we know that we kept ownership. Moreover, if we 
are assigned partitions in EAGER rebalancing, we expect that entire assignment 
is passed to {{{}onPartitionsAssigned(){}}}, because if 
{{onPartitionsRevoked()}} is always called at the start of a rebalance and 
EAGER protocol always revokes the entire assignment, then by the time we hit 
{{onPartitionsAssigned()}} there should be nothing assigned from the consumer's 
point of view, and therefore the entire assignment is newly added.

However, we recently ran into a situation where we received an assignment while 
the consumer's existing assignment was non-empty:
|     *Pod*
|                                      *Message*
|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Notifying assignor about the {*}new 
Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, 
topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117, 
topic-123, topic-130, topic-137, topic-141])|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: topic-26, 
topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, 
topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137, 
topic-141|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:31,923\{UTC} 
[kafka-coordinator-heartbeat-thread \| metric-aggregator] INFO  
o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] *Request joining group* due to: group is already 
rebalancing|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,132\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Successfully joined group with generation 
Generation\{generationId=12417, 
memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223',
 protocol='sticky'}|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,134\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Successfully synced group in generation 
Generation{generationId={*}12417{*}, 
memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223',
 protocol='sticky'}|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Notifying assignor about the \{*}new 
Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, 
topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117])|
|aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: |

Here you can see we get assigned partitions:

  26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117, 123, 130, 137, 141

And promptly see them all as newly added when passed to 
{{{}onPartitionsAssigned(){}}}. 6 seconds later the heartbeat thread notices 
another rebalance and requests to join. It quickly succeeds and then almost 
immediately successfully syncs. We then get a new assignment:

  26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117

This is a subset of the partitions we were assigned previously, missing 123, 
130, 137, and 141. Because {{onPartitionsRevoked()}} was not called at the 
beginning of this rebalance, the consumer still has the old assignment as its 
current assignment rather than it being empty, and so it thinks there are no 
newly assigned partitions.

Using this diagram from 
[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-ConsumerRebalanceProtocol:Stop-The-WorldEffect]
 as a visual guide, it seems like we sent the JoinGroup and succeeded in 
joining, but then we seemingly skipped to the SyncGroup and got our assignment. 
!https://cwiki.apache.org/confluence/download/attachments/103090108/Rebalance%20Today.jpg?version=1&modificationDate=1554837450000&api=v2!

Here are the group coordinator assignment logs for the initial assignment and 
then the assignment without a revoke. You can see they are sequential 
generations 12416 and 12417, so none are missed.
||Pod||Message||
|aggregator-ff95b6cf-nzkrx|2023-12-07 23:02:25,709\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Finished assignment for group at generation 
{*}12416{*}: 
\{consumer.metric-data-points.metric-aggregator-8d718b78-9b7e-4803-afda-49b1c35e2654=Assignment(partitions=[topic-28,
 topic-30, topic-57, topic-77, topic-128, topic-129, topic-133, topic-134, 
topic-136, topic-27, topic-56, topic-119, topic-22, topic-65, topic-43, 
topic-64]), 
consumer.metric-data-points.metric-aggregator-ab7e7027-3ca9-4dac-bb09-71958b515ab6=Assignment(partitions=[topic-35,
 topic-45, topic-62, topic-72, topic-79, topic-83, topic-90, topic-102, 
topic-106, topic-110, topic-114, topic-118, topic-124, topic-131, topic-138, 
topic-142]), 
consumer.metric-data-points.metric-aggregator-da772740-a386-491a-aa6d-69b26550dbf9=Assignment(partitions=[topic-37,
 topic-49, topic-66, topic-73, topic-80, topic-85, topic-95, topic-103, 
topic-107, topic-111, topic-115, topic-120, topic-125, topic-132, topic-139, 
topic-143]), 
consumer.metric-data-points.metric-aggregator-779655ef-3297-44d1-8edf-2e4b6c9b4f55=Assignment(partitions=[topic-7,
 topic-11, topic-14, topic-16, topic-21, topic-29, topic-38, topic-42, 
topic-52, topic-63, topic-69, topic-87, topic-91, topic-93, topic-96, 
topic-98]), 
consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223=Assignment(partitions=[topic-26,
 topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, 
topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137, 
topic-141]), 
consumer.metric-data-points.metric-aggregator-d06d548e-1e7f-48fb-98a0-2f6072fa3a5b=Assignment(partitions=[topic-9,
 topic-13, topic-18, topic-20, topic-25, topic-32, topic-33, topic-34, 
topic-36, topic-47, topic-48, topic-50, topic-51, topic-53, topic-59, 
topic-61]), 
consumer.metric-data-points.metric-aggregator-841a72e3-7ae4-4734-9793-7e947c4fa0a4=Assignment(partitions=[topic-19,
 topic-39, topic-54, topic-70, topic-74, topic-81, topic-86, topic-99, 
topic-104, topic-108, topic-112, topic-116, topic-122, topic-127, topic-135, 
topic-140]), 
consumer.metric-data-points.metric-aggregator-939df068-59ea-43d7-bb48-3c6baf97023a=Assignment(partitions=[topic-8,
 topic-12, topic-15, topic-17, topic-23, topic-31, topic-40, topic-46, 
topic-55, topic-67, topic-75, topic-89, topic-92, topic-94, topic-97, 
topic-100]), 
consumer.metric-data-points.metric-aggregator-933f7c51-27f5-436f-9b56-640eb55d64b2=Assignment(partitions=[topic-0,
 topic-1, topic-2, topic-3, topic-4, topic-5, topic-6, topic-10, topic-68, 
topic-41, topic-58, topic-121, topic-24, topic-84, topic-76, topic-126])}|
|aggregator-ff95b6cf-nzkrx|2023-12-07 23:02:32,119\{UTC} 
[KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer.metric-data-points.metric-aggregator, 
groupId=metric-aggregator] Finished assignment for group at generation 
{*}12417{*}: 
\{consumer.metric-data-points.metric-aggregator-8d718b78-9b7e-4803-afda-49b1c35e2654=Assignment(partitions=[topic-28,
 topic-30, topic-57, topic-77, topic-128, topic-129, topic-133, topic-134, 
topic-136, topic-27, topic-56, topic-119]), 
consumer.metric-data-points.metric-aggregator-ab7e7027-3ca9-4dac-bb09-71958b515ab6=Assignment(partitions=[topic-35,
 topic-45, topic-62, topic-72, topic-79, topic-83, topic-90, topic-102, 
topic-106, topic-110, topic-114, topic-118]), 
consumer.metric-data-points.metric-aggregator-76fd27b3-fb5b-49aa-af2a-b53150897cf9=Assignment(partitions=[topic-22,
 topic-51, topic-61, topic-76, topic-92, topic-96, topic-100, topic-124, 
topic-127, topic-132, topic-138, topic-141]), 
consumer.metric-data-points.metric-aggregator-9f6ab144-85a7-468c-9aa1-11f5445084b9=Assignment(partitions=[topic-43,
 topic-59, topic-65, topic-91, topic-94, topic-98, topic-123, topic-126, 
topic-131, topic-137, topic-140, topic-143]), 
consumer.metric-data-points.metric-aggregator-da772740-a386-491a-aa6d-69b26550dbf9=Assignment(partitions=[topic-37,
 topic-49, topic-66, topic-73, topic-80, topic-85, topic-95, topic-103, 
topic-107, topic-111, topic-115, topic-120]), 
consumer.metric-data-points.metric-aggregator-779655ef-3297-44d1-8edf-2e4b6c9b4f55=Assignment(partitions=[topic-7,
 topic-11, topic-14, topic-16, topic-21, topic-29, topic-38, topic-42, 
topic-52, topic-63, topic-69, topic-87]), 
consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223=Assignment(partitions=[topic-26,
 topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, 
topic-105, topic-109, topic-113, topic-117]), 
consumer.metric-data-points.metric-aggregator-d06d548e-1e7f-48fb-98a0-2f6072fa3a5b=Assignment(partitions=[topic-9,
 topic-13, topic-18, topic-20, topic-25, topic-32, topic-33, topic-34, 
topic-36, topic-47, topic-48, topic-50]), 
consumer.metric-data-points.metric-aggregator-841a72e3-7ae4-4734-9793-7e947c4fa0a4=Assignment(partitions=[topic-19,
 topic-39, topic-54, topic-70, topic-74, topic-81, topic-86, topic-99, 
topic-104, topic-108, topic-112, topic-116]), 
consumer.metric-data-points.metric-aggregator-939df068-59ea-43d7-bb48-3c6baf97023a=Assignment(partitions=[topic-8,
 topic-12, topic-15, topic-17, topic-23, topic-31, topic-40, topic-46, 
topic-55, topic-67, topic-75, topic-89]), 
consumer.metric-data-points.metric-aggregator-91b1e207-0978-430e-9393-679ac44647b8=Assignment(partitions=[topic-24,
 topic-53, topic-64, topic-84, topic-93, topic-97, topic-122, topic-125, 
topic-130, topic-135, topic-139, topic-142]), 
consumer.metric-data-points.metric-aggregator-933f7c51-27f5-436f-9b56-640eb55d64b2=Assignment(partitions=[topic-0,
 topic-1, topic-2, topic-3, topic-4, topic-5, topic-6, topic-10, topic-68, 
topic-41, topic-58, topic-121])}|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to