[ 
https://issues.apache.org/jira/browse/KAFKA-19356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-19356:
-----------------------------------
    Labels: consumer-threading-refactor kip-848-client-support  (was: )

> AsyncConsumer should ensure consistency of assigned partitions and 
> subscription
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-19356
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19356
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: Lianet Magrans
>            Assignee: Lianet Magrans
>            Priority: Major
>              Labels: consumer-threading-refactor, kip-848-client-support
>             Fix For: 4.1.0
>
>
> With the new AsyncConsumer, the client reconciles partitions assigned by the 
> coordinator, and does not perform any validation against the subscription 
> (letting the coordinator drive, and expecting that subscription/assignments 
> should become consistent on the next HB req/resp round). 
> Still, this allows for a window of time where they would not be consistent on 
> the client, and the main risk is related to fetching I would expect: a 
> subscription change will clear the fetch buffer in the async consumer, but 
> won't prevent processing fetch responses for previous requests or issuing new 
> ones for an assigned partition that is not in the subscription anymore? 
> (fetched records are only discarded 
> [here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java#L153-L159]
>  if the partition is not assigned or not fetchable, so that would prevent 
> this risk from what I can see, could be missing something). 
> The Classic consumer ensures it does not maintain/fetch assigned partitions 
> that are not in the subscription because:
>  * classicConsumer does not take assignments received from the coordinator 
> that are not in the subscription ("ignores"/rejoins if it gets them) 
> ([here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L410C28-L410C62]).
>  For this case, we could consider doing it in the AsyncConsumer similarly, 
> maybe not reconciling assignments received in HB response but not in the 
> subscription.
>  * classicConsumer pro-actively revokes assigned partitions not in the 
> subscription on every onJoinPrepare (blocking, updating the assignment before 
> fetching, 
> [here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L838-L849]).
>  For this case we need to consider a bit more. We surely want to keep the 
> client away from driving assignment logic to keep it only on the coordinator, 
> but if we say we're waiting for the coordinator to revoke that partition (on 
> the next HB req/resp round), we still need to ensure on the client that we 
> don't consider such partition "fetchable", and that we discard fetch 
> responses for it. 
> Note that these validations could be applied on the client for explicit 
> subscription or client side regex only. In the case of broker-side regex the 
> client cannot perform any validation given that it never computes the regex 
> (and should not).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to