[ https://issues.apache.org/jira/browse/KAFKA-15640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17921564#comment-17921564 ]
Jun Rao commented on KAFKA-15640: --------------------------------- The first issue. This doesn't seem to be a real issue since AbstractFetch.fetchablePartitions() calls subscriptions.fetchablePartitions(), which ignores paused partitions. The second issue. This is not an issue either since subscriptions.fetchablePartitions() checks nextInLineFetch.isConsumed() when deciding if it's a buffered partition. > Refactor CompletedFetch initialization > -------------------------------------- > > Key: KAFKA-15640 > URL: https://issues.apache.org/jira/browse/KAFKA-15640 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer > Reporter: Kirk True > Assignee: Kirk True > Priority: Major > Labels: fetcher > > The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and > {{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments > [here|https://github.com/apache/kafka/pull/14406#discussion_r1349361459] and > [here|https://github.com/apache/kafka/pull/14406#discussion_r1350773132], > there are three issues... > > First: > {quote}{color:#172b4d}This is an existing issue. But the way we handle paused > partitions in {{collectFetch}} seems problematic. The application thread > first calls {{fetchBuffer.setNextInLineFetch(null)}} and then calls > {{{}fetchBuffer.addAll(pausedCompletedFetches){}}}. This could leave a brief > window where the paused partition is not included in either > {{nextInLineFetch}} or {{{}completedFetches{}}}. If the background thread > kicks in in that window, it could have fetched another chunk for that > partition and added the response back to FetchBuffer. This would violate the > assumption there is no more than one pending {{CompletedFetch}} per partition > in FetchBuffer and could cause records returned not in offset order or > duplicates to be returned.{color} > {quote} > > {color:#172b4d}Second:{color} > {quote}{color:#172b4d}The second existing issue is on the > {{fetchBuffer.setNextInLineFetch}} call in {{{}collectFetch{}}}. The issue is > that after all records are drained from {{{}nextInLineFetch{}}}. We only call > {{setNextInLineFetch}} when there is a new {{{}completedFetch{}}}. However, > until the drained {{completedFetch}} is removed from {{{}nextInLineFetch{}}}, > the background thread can't fetch the next chunk. So, it seems that we will > just be stuck here.{color} > {quote} > > {color:#172b4d}Third:{color} > {quote}{color:#172b4d}Currently, {{fetchBuffer.setNextInLineFetch}} and > {{fetchBuffer.poll}} are separate operations and we expect the caller to call > them in the right order to avoid a partition missing in FetchBuffer in the > transition phase. It still leaves us with the situation that a partition > could be in both completedFetches and nextInLineFetch at a particular time. > It's not a problem for now, but it may be in the future. Could we make them > an atomic operation? If not, could we add a comment to document the correct > usage of the api and the impact on partition being duplicated in > completedFetches and nextInLineFetch?{color} > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)