[ https://issues.apache.org/jira/browse/KAFKA-19356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lianet Magrans updated KAFKA-19356: ----------------------------------- Labels: consumer-threading-refactor kip-848-client-support (was: ) > AsyncConsumer should ensure consistency of assigned partitions and > subscription > ------------------------------------------------------------------------------- > > Key: KAFKA-19356 > URL: https://issues.apache.org/jira/browse/KAFKA-19356 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Reporter: Lianet Magrans > Assignee: Lianet Magrans > Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 4.1.0 > > > With the new AsyncConsumer, the client reconciles partitions assigned by the > coordinator, and does not perform any validation against the subscription > (letting the coordinator drive, and expecting that subscription/assignments > should become consistent on the next HB req/resp round). > Still, this allows for a window of time where they would not be consistent on > the client, and the main risk is related to fetching I would expect: a > subscription change will clear the fetch buffer in the async consumer, but > won't prevent processing fetch responses for previous requests or issuing new > ones for an assigned partition that is not in the subscription anymore? > (fetched records are only discarded > [here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java#L153-L159] > if the partition is not assigned or not fetchable, so that would prevent > this risk from what I can see, could be missing something). > The Classic consumer ensures it does not maintain/fetch assigned partitions > that are not in the subscription because: > * classicConsumer does not take assignments received from the coordinator > that are not in the subscription ("ignores"/rejoins if it gets them) > ([here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L410C28-L410C62]). > For this case, we could consider doing it in the AsyncConsumer similarly, > maybe not reconciling assignments received in HB response but not in the > subscription. > * classicConsumer pro-actively revokes assigned partitions not in the > subscription on every onJoinPrepare (blocking, updating the assignment before > fetching, > [here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L838-L849]). > For this case we need to consider a bit more. We surely want to keep the > client away from driving assignment logic to keep it only on the coordinator, > but if we say we're waiting for the coordinator to revoke that partition (on > the next HB req/resp round), we still need to ensure on the client that we > don't consider such partition "fetchable", and that we discard fetch > responses for it. > Note that these validations could be applied on the client for explicit > subscription or client side regex only. In the case of broker-side regex the > client cannot perform any validation given that it never computes the regex > (and should not). -- This message was sent by Atlassian Jira (v8.20.10#820010)