mumrah commented on code in PR #6731: URL: https://github.com/apache/kafka/pull/6731#discussion_r1964210916
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ########## @@ -1136,24 +1163,42 @@ private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) { log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset); subscriptions.updateLastStableOffset(tp, partition.lastStableOffset); } + + if (partition.preferredReadReplica.isPresent()) { + subscriptions.updatePreferredReadReplica(partitionRecords.partition, partition.preferredReadReplica.get(), () -> { + long expireTimeMs = time.milliseconds() + metadata.metadataExpireMs(); + log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}", + tp, partition.preferredReadReplica.get(), expireTimeMs); + return expireTimeMs; + }); + } + } else if (error == Errors.NOT_LEADER_FOR_PARTITION || error == Errors.REPLICA_NOT_AVAILABLE || error == Errors.KAFKA_STORAGE_ERROR || - error == Errors.FENCED_LEADER_EPOCH) { + error == Errors.FENCED_LEADER_EPOCH || + error == Errors.OFFSET_NOT_AVAILABLE) { log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); this.metadata.requestUpdate(); } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { log.warn("Received unknown topic or partition error in fetch for partition {}", tp); this.metadata.requestUpdate(); } else if (error == Errors.OFFSET_OUT_OF_RANGE) { - if (fetchOffset != subscriptions.position(tp).offset) { - log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " + - "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp)); - } else if (subscriptions.hasDefaultOffsetResetPolicy()) { - log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp); - subscriptions.requestOffsetReset(tp); + Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp); Review Comment: @nvartolomei can you file a Jira ticket or ask this question on the mailing list? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org