[
https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kirk True reassigned KAFKA-15641:
---------------------------------
Assignee: (was: Kirk True)
> Investigate CompletedFetch handleInitializeErrors for accuracy
> --------------------------------------------------------------
>
> Key: KAFKA-15641
> URL: https://issues.apache.org/jira/browse/KAFKA-15641
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Reporter: Kirk True
> Priority: Major
> Labels: fetcher
>
> The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named
> testFetchedRecordsAfterSeek, which [upon closer
> inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828]
> may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}.
> Here is the test code:
> {code:java}
> @Test
> public void testFetchedRecordsAfterSeek() {
> buildFetcher(OffsetResetStrategy.NONE,
> new ByteArrayDeserializer(),
> new ByteArrayDeserializer(),
> 2,
> IsolationLevel.READ_UNCOMMITTED);
> assignFromUser(singleton(tp0));
> // Step 1: seek to offset 0 of our partition.
> subscriptions.seek(tp0, 0);
> // Step 2: issue a mock broker request to fetch data from the current
> offset in our local state,
> // i.e. offset 0.
> assertTrue(sendFetches() > 0);
> // Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker.
> client.prepareResponse(fullFetchResponse(tidp0, records,
> Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
> // Step 4: process the network I/O to receive the response from the
> broker with the OFFSET_OUT_OF_RANGE
> // that was injected. Note, however, that we haven't "collected" the
> fetch data included in the response.
> networkClientDelegate.poll(time.timer(0));
> // Step 5: validate that the partition is not marked as needing its
> offset reset. The response validation
> // logic is performed during the fetch collection, which doesn't happen
> until assertEmptyFetch below.
> assertFalse(subscriptions.isOffsetResetNeeded(tp0));
> // Step 6: update the partition's position in our local state to offset
> 2. We still haven't collected the
> // fetch, so we haven't performed any validation of the fetch response.
> subscriptions.seek(tp0, 2);
> // Step 7: perform the fetch collection. As part of that process, error
> handling is performed. Since
> // we intentionally injected an error above, this error will be checked
> and handled in the
> // FetchCollector.handleInitializeErrors method. When handling
> OFFSET_OUT_OF_RANGE, handleInitializeErrors
> // will notice that the original requested offset (0) is different from
> the state of our current offset (2).
> assertEmptyFetch("Should not return records or advance position after
> seeking to end of topic partition");
> }
> {code}
> Here is the code from {{FetchCollector.handleInitializeErrors}}:
> {code:java}
> private void handleInitializeErrors(final CompletedFetch completedFetch,
> final Errors error) {
> final TopicPartition tp = completedFetch.partition;
> final long fetchOffset = completedFetch.nextFetchOffset();
> . . .
> if (error == Errors.OFFSET_OUT_OF_RANGE) {
> Optional<Integer> clearedReplicaId =
> subscriptions.clearPreferredReadReplica(tp);
> if (!clearedReplicaId.isPresent()) {
> // If there's no preferred replica to clear, we're fetching from
> the leader so handle
> // this error normally
> SubscriptionState.FetchPosition position =
> subscriptions.position(tp);
> if (position == null || fetchOffset != position.offset) {
> log.debug("Discarding stale fetch response for partition {}
> since the fetched offset {} " +
> "does not match the current offset {}", tp,
> fetchOffset, position);
> } else {
> String errorMessage = "Fetch position " + position + " is out
> of range for partition " + tp;
> if (subscriptions.hasDefaultOffsetResetPolicy()) {
> log.info("{}, resetting offset", errorMessage);
> subscriptions.requestOffsetReset(tp);
> } else {
> log.info("{}, raising error to the application since no
> reset policy is configured",
> errorMessage);
> throw new OffsetOutOfRangeException(errorMessage,
> Collections.singletonMap(tp, position.offset));
> }
> }
> } else {
> log.debug("Unset the preferred read replica {} for partition {}
> since we got {} when fetching {}",
> clearedReplicaId.get(), tp, error, fetchOffset);
> }
> }
> . . .
> }
> {code}
> The question is: why is the {{OFFSET_OUT_OF_RANGE}} error ignored just
> because of the following code?
> {code:java}
> if (position == null || fetchOffset != position.offset) {
> {code}
> It's a bit weird that the above check is only done for the
> {{OFFSET_OUT_OF_RANGE}} error, instead of any error.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)