Jonathan Haapala created KAFKA-15998: ----------------------------------------
Summary: EAGER rebalance onPartitionsAssigned() called with no previous onPartitionsLost() nor onPartitionsRevoked() Key: KAFKA-15998 URL: https://issues.apache.org/jira/browse/KAFKA-15998 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.4.0 Reporter: Jonathan Haapala I ran into a case where {{onPartitionsAssigned()}} was called without first calling {{onPartitionsRevoked()}} and there is no indication that {{onPartitionsLost()}} was called or had any reason to be called. We are using the *EAGER* rebalance protocol and the *StickyAssignor* on kafka 3.4.0. Our services rely on the API contract that {{{}onPartitionsRevoked(){}}}: {quote}In eager rebalancing, it will always be called at the start of a rebalance and after the consumer stops fetching data.{quote} We internally keep track of partition states with a state machine, and rely on these APIs to assert what expected states we are in. So when a partition is Revoked and then re-Assigned, we know that we kept ownership. Moreover, if we are assigned partitions in EAGER rebalancing, we expect that entire assignment is passed to {{{}onPartitionsAssigned(){}}}, because if {{onPartitionsRevoked()}} is always called at the start of a rebalance and EAGER protocol always revokes the entire assignment, then by the time we hit {{onPartitionsAssigned()}} there should be nothing assigned from the consumer's point of view, and therefore the entire assignment is newly added. However, we recently ran into a situation where we received an assignment while the consumer's existing assignment was non-empty: | *Pod* | *Message* | |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Notifying assignor about the {*}new Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137, topic-141])| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: topic-26, topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137, topic-141| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:31,923\{UTC} [kafka-coordinator-heartbeat-thread \| metric-aggregator] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] *Request joining group* due to: group is already rebalancing| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,132\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Successfully joined group with generation Generation\{generationId=12417, memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223', protocol='sticky'}| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,134\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Successfully synced group in generation Generation{generationId={*}12417{*}, memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223', protocol='sticky'}| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Notifying assignor about the \{*}new Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117])| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: | Here you can see we get assigned partitions: 26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117, 123, 130, 137, 141 And promptly see them all as newly added when passed to {{{}onPartitionsAssigned(){}}}. 6 seconds later the heartbeat thread notices another rebalance and requests to join. It quickly succeeds and then almost immediately successfully syncs. We then get a new assignment: 26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117 This is a subset of the partitions we were assigned previously, missing 123, 130, 137, and 141. Because {{onPartitionsRevoked()}} was not called at the beginning of this rebalance, the consumer still has the old assignment as its current assignment rather than it being empty, and so it thinks there are no newly assigned partitions. Using this diagram from [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-ConsumerRebalanceProtocol:Stop-The-WorldEffect] as a visual guide, it seems like we sent the JoinGroup and succeeded in joining, but then we seemingly skipped to the SyncGroup and got our assignment. !https://cwiki.apache.org/confluence/download/attachments/103090108/Rebalance%20Today.jpg?version=1&modificationDate=1554837450000&api=v2! Here are the group coordinator assignment logs for the initial assignment and then the assignment without a revoke. You can see they are sequential generations 12416 and 12417, so none are missed. ||Pod||Message|| |aggregator-ff95b6cf-nzkrx|2023-12-07 23:02:25,709\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Finished assignment for group at generation {*}12416{*}: \{consumer.metric-data-points.metric-aggregator-8d718b78-9b7e-4803-afda-49b1c35e2654=Assignment(partitions=[topic-28, topic-30, topic-57, topic-77, topic-128, topic-129, topic-133, topic-134, topic-136, topic-27, topic-56, topic-119, topic-22, topic-65, topic-43, topic-64]), consumer.metric-data-points.metric-aggregator-ab7e7027-3ca9-4dac-bb09-71958b515ab6=Assignment(partitions=[topic-35, topic-45, topic-62, topic-72, topic-79, topic-83, topic-90, topic-102, topic-106, topic-110, topic-114, topic-118, topic-124, topic-131, topic-138, topic-142]), consumer.metric-data-points.metric-aggregator-da772740-a386-491a-aa6d-69b26550dbf9=Assignment(partitions=[topic-37, topic-49, topic-66, topic-73, topic-80, topic-85, topic-95, topic-103, topic-107, topic-111, topic-115, topic-120, topic-125, topic-132, topic-139, topic-143]), consumer.metric-data-points.metric-aggregator-779655ef-3297-44d1-8edf-2e4b6c9b4f55=Assignment(partitions=[topic-7, topic-11, topic-14, topic-16, topic-21, topic-29, topic-38, topic-42, topic-52, topic-63, topic-69, topic-87, topic-91, topic-93, topic-96, topic-98]), consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223=Assignment(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137, topic-141]), consumer.metric-data-points.metric-aggregator-d06d548e-1e7f-48fb-98a0-2f6072fa3a5b=Assignment(partitions=[topic-9, topic-13, topic-18, topic-20, topic-25, topic-32, topic-33, topic-34, topic-36, topic-47, topic-48, topic-50, topic-51, topic-53, topic-59, topic-61]), consumer.metric-data-points.metric-aggregator-841a72e3-7ae4-4734-9793-7e947c4fa0a4=Assignment(partitions=[topic-19, topic-39, topic-54, topic-70, topic-74, topic-81, topic-86, topic-99, topic-104, topic-108, topic-112, topic-116, topic-122, topic-127, topic-135, topic-140]), consumer.metric-data-points.metric-aggregator-939df068-59ea-43d7-bb48-3c6baf97023a=Assignment(partitions=[topic-8, topic-12, topic-15, topic-17, topic-23, topic-31, topic-40, topic-46, topic-55, topic-67, topic-75, topic-89, topic-92, topic-94, topic-97, topic-100]), consumer.metric-data-points.metric-aggregator-933f7c51-27f5-436f-9b56-640eb55d64b2=Assignment(partitions=[topic-0, topic-1, topic-2, topic-3, topic-4, topic-5, topic-6, topic-10, topic-68, topic-41, topic-58, topic-121, topic-24, topic-84, topic-76, topic-126])}| |aggregator-ff95b6cf-nzkrx|2023-12-07 23:02:32,119\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Finished assignment for group at generation {*}12417{*}: \{consumer.metric-data-points.metric-aggregator-8d718b78-9b7e-4803-afda-49b1c35e2654=Assignment(partitions=[topic-28, topic-30, topic-57, topic-77, topic-128, topic-129, topic-133, topic-134, topic-136, topic-27, topic-56, topic-119]), consumer.metric-data-points.metric-aggregator-ab7e7027-3ca9-4dac-bb09-71958b515ab6=Assignment(partitions=[topic-35, topic-45, topic-62, topic-72, topic-79, topic-83, topic-90, topic-102, topic-106, topic-110, topic-114, topic-118]), consumer.metric-data-points.metric-aggregator-76fd27b3-fb5b-49aa-af2a-b53150897cf9=Assignment(partitions=[topic-22, topic-51, topic-61, topic-76, topic-92, topic-96, topic-100, topic-124, topic-127, topic-132, topic-138, topic-141]), consumer.metric-data-points.metric-aggregator-9f6ab144-85a7-468c-9aa1-11f5445084b9=Assignment(partitions=[topic-43, topic-59, topic-65, topic-91, topic-94, topic-98, topic-123, topic-126, topic-131, topic-137, topic-140, topic-143]), consumer.metric-data-points.metric-aggregator-da772740-a386-491a-aa6d-69b26550dbf9=Assignment(partitions=[topic-37, topic-49, topic-66, topic-73, topic-80, topic-85, topic-95, topic-103, topic-107, topic-111, topic-115, topic-120]), consumer.metric-data-points.metric-aggregator-779655ef-3297-44d1-8edf-2e4b6c9b4f55=Assignment(partitions=[topic-7, topic-11, topic-14, topic-16, topic-21, topic-29, topic-38, topic-42, topic-52, topic-63, topic-69, topic-87]), consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223=Assignment(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117]), consumer.metric-data-points.metric-aggregator-d06d548e-1e7f-48fb-98a0-2f6072fa3a5b=Assignment(partitions=[topic-9, topic-13, topic-18, topic-20, topic-25, topic-32, topic-33, topic-34, topic-36, topic-47, topic-48, topic-50]), consumer.metric-data-points.metric-aggregator-841a72e3-7ae4-4734-9793-7e947c4fa0a4=Assignment(partitions=[topic-19, topic-39, topic-54, topic-70, topic-74, topic-81, topic-86, topic-99, topic-104, topic-108, topic-112, topic-116]), consumer.metric-data-points.metric-aggregator-939df068-59ea-43d7-bb48-3c6baf97023a=Assignment(partitions=[topic-8, topic-12, topic-15, topic-17, topic-23, topic-31, topic-40, topic-46, topic-55, topic-67, topic-75, topic-89]), consumer.metric-data-points.metric-aggregator-91b1e207-0978-430e-9393-679ac44647b8=Assignment(partitions=[topic-24, topic-53, topic-64, topic-84, topic-93, topic-97, topic-122, topic-125, topic-130, topic-135, topic-139, topic-142]), consumer.metric-data-points.metric-aggregator-933f7c51-27f5-436f-9b56-640eb55d64b2=Assignment(partitions=[topic-0, topic-1, topic-2, topic-3, topic-4, topic-5, topic-6, topic-10, topic-68, topic-41, topic-58, topic-121])}| -- This message was sent by Atlassian Jira (v8.20.10#820010)