mumrah commented on code in PR #6731:
URL: https://github.com/apache/kafka/pull/6731#discussion_r1964210916


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1136,24 +1163,42 @@ private PartitionRecords 
parseCompletedFetch(CompletedFetch completedFetch) {
                     log.trace("Updating last stable offset for partition {} to 
{}", tp, partition.lastStableOffset);
                     subscriptions.updateLastStableOffset(tp, 
partition.lastStableOffset);
                 }
+
+                if (partition.preferredReadReplica.isPresent()) {
+                    
subscriptions.updatePreferredReadReplica(partitionRecords.partition, 
partition.preferredReadReplica.get(), () -> {
+                        long expireTimeMs = time.milliseconds() + 
metadata.metadataExpireMs();
+                        log.debug("Updating preferred read replica for 
partition {} to {}, set to expire at {}",
+                                tp, partition.preferredReadReplica.get(), 
expireTimeMs);
+                        return expireTimeMs;
+                    });
+                }
+
             } else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
                        error == Errors.REPLICA_NOT_AVAILABLE ||
                        error == Errors.KAFKA_STORAGE_ERROR ||
-                       error == Errors.FENCED_LEADER_EPOCH) {
+                       error == Errors.FENCED_LEADER_EPOCH ||
+                       error == Errors.OFFSET_NOT_AVAILABLE) {
                 log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
                 this.metadata.requestUpdate();
             } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                 log.warn("Received unknown topic or partition error in fetch 
for partition {}", tp);
                 this.metadata.requestUpdate();
             } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
-                if (fetchOffset != subscriptions.position(tp).offset) {
-                    log.debug("Discarding stale fetch response for partition 
{} since the fetched offset {} " +
-                            "does not match the current offset {}", tp, 
fetchOffset, subscriptions.position(tp));
-                } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
-                    log.info("Fetch offset {} is out of range for partition 
{}, resetting offset", fetchOffset, tp);
-                    subscriptions.requestOffsetReset(tp);
+                Optional<Integer> clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);

Review Comment:
   @nvartolomei can you file a Jira ticket or ask this question on the mailing 
list?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to