[
https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Colin McCabe updated KAFKA-17066:
---------------------------------
Fix Version/s: 4.0.0
(was: 3.9.0)
> New consumer updateFetchPositions should perform all operations in background
> thread
> ------------------------------------------------------------------------------------
>
> Key: KAFKA-17066
> URL: https://issues.apache.org/jira/browse/KAFKA-17066
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Affects Versions: 3.8.0
> Reporter: Lianet Magrans
> Assignee: Lianet Magrans
> Priority: Blocker
> Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> The updateFetchPositions func in the new consumer performs several actions
> based on the assigned partitions from the subscriptionState. The way it's
> currently implemented, it fetches committed offsets for partitions that
> required a position (retrieved from subscription state in the app thread),
> and then resets positions for the partitions still needing one (retrieved
> from the subscription state but in the backgroud thread).
> This is problematic, given that the assignment/subscriptionState may change
> in the background thread at any time (ex. new partitions reconciled), so we
> could end up resetting positions to the partition offsets for a partition for
> which we never evetn attempted to retrieve committed offsets.
> This sequence for a consumer that owns a partitions tp0,:
> * consumer owns tp0
> * app thread -> updateFetchPositions triggers
> initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned
> partitions requiring a position (taking them from
> subscriptions.initializingPartitions()). This will fetch committed offsets
> for tp0 only.
> * background thread -> receives new partition tp1 and completes
> reconciliation (adds it to the subscription state as INITIALIZING, requires a
> position)
> * app thread -> updateFetchPositions resets positions for all partitions
> that still don't have a valid position after initWithCommittedOffsetsIfNeeded
> (taking them from subscriptionState.partitionsNeedingReset). This will
> mistakenly consider that it should reset tp1 to the partition offsets, when
> in reality it never even tried fetching the committed offsets for it because
> it wasn't assigned when initWithCommittedOffsetsIfNeeded happened.
> We should consider moving the updateFetchPositions as a single event to the
> background, that would safely use the subscriptionState object and apply all
> actions involved in the updateFetchPositions to the same consistent set of
> partitions assigned at that moment.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)