[ 
https://issues.apache.org/jira/browse/KAFKA-15640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17921564#comment-17921564
 ] 

Jun Rao commented on KAFKA-15640:
---------------------------------

The first issue. This doesn't seem to be a real issue since 
AbstractFetch.fetchablePartitions() calls subscriptions.fetchablePartitions(), 
which ignores paused partitions.

 

The second issue. This is not an issue either since 
subscriptions.fetchablePartitions() checks nextInLineFetch.isConsumed() when 
deciding if it's a buffered partition.

> Refactor CompletedFetch initialization
> --------------------------------------
>
>                 Key: KAFKA-15640
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15640
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients, consumer
>            Reporter: Kirk True
>            Assignee: Kirk True
>            Priority: Major
>              Labels: fetcher
>
> The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and 
> {{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments 
> [here|https://github.com/apache/kafka/pull/14406#discussion_r1349361459] and 
> [here|https://github.com/apache/kafka/pull/14406#discussion_r1350773132], 
> there are three issues...
>  
> First:
> {quote}{color:#172b4d}This is an existing issue. But the way we handle paused 
> partitions in {{collectFetch}} seems problematic. The application thread 
> first calls {{fetchBuffer.setNextInLineFetch(null)}} and then calls 
> {{{}fetchBuffer.addAll(pausedCompletedFetches){}}}. This could leave a brief 
> window where the paused partition is not included in either 
> {{nextInLineFetch}} or {{{}completedFetches{}}}. If the background thread 
> kicks in in that window, it could have fetched another chunk for that 
> partition and added the response back to FetchBuffer. This would violate the 
> assumption there is no more than one pending {{CompletedFetch}} per partition 
> in FetchBuffer and could cause records returned not in offset order or 
> duplicates to be returned.{color}
> {quote}
>  
> {color:#172b4d}Second:{color}
> {quote}{color:#172b4d}The second existing issue is on the 
> {{fetchBuffer.setNextInLineFetch}} call in {{{}collectFetch{}}}. The issue is 
> that after all records are drained from {{{}nextInLineFetch{}}}. We only call 
> {{setNextInLineFetch}} when there is a new {{{}completedFetch{}}}. However, 
> until the drained {{completedFetch}} is removed from {{{}nextInLineFetch{}}}, 
> the background thread can't fetch the next chunk. So, it seems that we will 
> just be stuck here.{color}
> {quote}
>  
> {color:#172b4d}Third:{color}
> {quote}{color:#172b4d}Currently, {{fetchBuffer.setNextInLineFetch}} and 
> {{fetchBuffer.poll}} are separate operations and we expect the caller to call 
> them in the right order to avoid a partition missing in FetchBuffer in the 
> transition phase. It still leaves us with the situation that a partition 
> could be in both completedFetches and nextInLineFetch at a particular time. 
> It's not a problem for now, but it may be in the future. Could we make them 
> an atomic operation? If not, could we add a comment to document the correct 
> usage of the api and the impact on partition being duplicated in 
> completedFetches and nextInLineFetch?{color}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to