[ 
https://issues.apache.org/jira/browse/KAFKA-19356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-19356:
-----------------------------------
    Description: 
With the new AsyncConsumer, the client reconciles partitions assigned by the 
coordinator, and does not perform any validation against the subscription 
(letting the coordinator drive, and expecting that subscription/assignments 
should become consistent on the next HB req/resp round). 

Still, this allows for a window of time where they would not be consistent on 
the client, and the main risk is related to fetching I would expect: a 
subscription change will clear the fetch buffer in the async consumer, but 
won't prevent processing fetch responses for previous requests or issuing new 
ones for an assigned partition that is not in the subscription anymore? 
(fetched records are only discarded 
[here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java#L153-L159]
 if the partition is not assigned or not fetchable, so that would prevent this 
risk from what I can see, could be missing something). 

The Classic consumer ensures it does not maintain/fetch assigned partitions 
that are not in the subscription in 2 ways:
 * classicConsumer does not take assignments received from the coordinator that 
are not in the subscription ("ignores"/rejoins if it gets them) 
([here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L410C28-L410C62]).
 For this case, we could consider doing it in the AsyncConsumer similarly, 
maybe not reconciling assignments received in HB response but not in the 
subscription.
 * classicConsumer pro-actively revokes assigned partitions not in the 
subscription on every onJoinPrepare (blocking, updating the assignment before 
fetching, 
[here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L838-L849]).
 For this case we need to consider a bit more. We surely want to keep the 
client away from driving assignment logic to keep it only on the coordinator, 
but if we say we're waiting for the coordinator to revoke that partition (on 
the next HB req/resp round), we still need to ensure on the client that we 
don't consider such partition "fetchable", and that we discard fetch responses 
for it. 

Note that these validations could be applied on the client for explicit 
subscription or client side regex only. In the case of broker-side regex the 
client cannot perform any validation given that it never computes the regex 
(and should not).

  was:
With the new AsyncConsumer, the client reconciles partitions assigned by the 
coordinator, and does not perform any validation against the subscription 
(letting the coordinator drive, and expecting that subscription/assignments 
should become consistent on the next HB req/resp round). 

Still, this allows for a window of time where they would not be consistent on 
the client, and the main risk is related to fetching I would expect: a 
subscription change will clear the fetch buffer in the async consumer, but 
won't prevent processing fetch responses for previous requests or issuing new 
ones for an assigned partition that is not in the subscription anymore? 
(fetched records are only discarded 
[here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java#L153-L159]
 if the partition is not assigned or not fetchable, so that would prevent this 
risk from what I can see, could be missing something). 

The Classic consumer ensures it does not maintain/fetch assigned partitions 
that are not in the subscription because:
 * classicConsumer does not take assignments received from the coordinator that 
are not in the subscription ("ignores"/rejoins if it gets them) 
([here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L410C28-L410C62]).
 For this case, we could consider doing it in the AsyncConsumer similarly, 
maybe not reconciling assignments received in HB response but not in the 
subscription.
 * classicConsumer pro-actively revokes assigned partitions not in the 
subscription on every onJoinPrepare (blocking, updating the assignment before 
fetching, 
[here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L838-L849]).
 For this case we need to consider a bit more. We surely want to keep the 
client away from driving assignment logic to keep it only on the coordinator, 
but if we say we're waiting for the coordinator to revoke that partition (on 
the next HB req/resp round), we still need to ensure on the client that we 
don't consider such partition "fetchable", and that we discard fetch responses 
for it. 

Note that these validations could be applied on the client for explicit 
subscription or client side regex only. In the case of broker-side regex the 
client cannot perform any validation given that it never computes the regex 
(and should not).


> AsyncConsumer should ensure consistency of assigned partitions and 
> subscription
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-19356
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19356
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: Lianet Magrans
>            Assignee: Lianet Magrans
>            Priority: Major
>              Labels: consumer-threading-refactor, kip-848-client-support
>             Fix For: 4.1.0
>
>
> With the new AsyncConsumer, the client reconciles partitions assigned by the 
> coordinator, and does not perform any validation against the subscription 
> (letting the coordinator drive, and expecting that subscription/assignments 
> should become consistent on the next HB req/resp round). 
> Still, this allows for a window of time where they would not be consistent on 
> the client, and the main risk is related to fetching I would expect: a 
> subscription change will clear the fetch buffer in the async consumer, but 
> won't prevent processing fetch responses for previous requests or issuing new 
> ones for an assigned partition that is not in the subscription anymore? 
> (fetched records are only discarded 
> [here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java#L153-L159]
>  if the partition is not assigned or not fetchable, so that would prevent 
> this risk from what I can see, could be missing something). 
> The Classic consumer ensures it does not maintain/fetch assigned partitions 
> that are not in the subscription in 2 ways:
>  * classicConsumer does not take assignments received from the coordinator 
> that are not in the subscription ("ignores"/rejoins if it gets them) 
> ([here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L410C28-L410C62]).
>  For this case, we could consider doing it in the AsyncConsumer similarly, 
> maybe not reconciling assignments received in HB response but not in the 
> subscription.
>  * classicConsumer pro-actively revokes assigned partitions not in the 
> subscription on every onJoinPrepare (blocking, updating the assignment before 
> fetching, 
> [here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L838-L849]).
>  For this case we need to consider a bit more. We surely want to keep the 
> client away from driving assignment logic to keep it only on the coordinator, 
> but if we say we're waiting for the coordinator to revoke that partition (on 
> the next HB req/resp round), we still need to ensure on the client that we 
> don't consider such partition "fetchable", and that we discard fetch 
> responses for it. 
> Note that these validations could be applied on the client for explicit 
> subscription or client side regex only. In the case of broker-side regex the 
> client cannot perform any validation given that it never computes the regex 
> (and should not).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to