nvartolomei commented on code in PR #6731:
URL: https://github.com/apache/kafka/pull/6731#discussion_r1912507814


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1136,24 +1163,42 @@ private PartitionRecords 
parseCompletedFetch(CompletedFetch completedFetch) {
                     log.trace("Updating last stable offset for partition {} to 
{}", tp, partition.lastStableOffset);
                     subscriptions.updateLastStableOffset(tp, 
partition.lastStableOffset);
                 }
+
+                if (partition.preferredReadReplica.isPresent()) {
+                    
subscriptions.updatePreferredReadReplica(partitionRecords.partition, 
partition.preferredReadReplica.get(), () -> {
+                        long expireTimeMs = time.milliseconds() + 
metadata.metadataExpireMs();
+                        log.debug("Updating preferred read replica for 
partition {} to {}, set to expire at {}",
+                                tp, partition.preferredReadReplica.get(), 
expireTimeMs);
+                        return expireTimeMs;
+                    });
+                }
+
             } else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
                        error == Errors.REPLICA_NOT_AVAILABLE ||
                        error == Errors.KAFKA_STORAGE_ERROR ||
-                       error == Errors.FENCED_LEADER_EPOCH) {
+                       error == Errors.FENCED_LEADER_EPOCH ||
+                       error == Errors.OFFSET_NOT_AVAILABLE) {
                 log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
                 this.metadata.requestUpdate();
             } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                 log.warn("Received unknown topic or partition error in fetch 
for partition {}", tp);
                 this.metadata.requestUpdate();
             } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
-                if (fetchOffset != subscriptions.position(tp).offset) {
-                    log.debug("Discarding stale fetch response for partition 
{} since the fetched offset {} " +
-                            "does not match the current offset {}", tp, 
fetchOffset, subscriptions.position(tp));
-                } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
-                    log.info("Fetch offset {} is out of range for partition 
{}, resetting offset", fetchOffset, tp);
-                    subscriptions.requestOffsetReset(tp);
+                Optional<Integer> clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);

Review Comment:
   I know this is late but I'm not sure of a better place where to ask this 
question. Please redirect me if not appropriate.
   
   I'm reading through 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
 and this implementation doesn't seem to match the KIP wording.
   
   > The log start offset for the leader is not guaranteed to be present on the 
follower we are fetching from and we do not want the consumer to get stuck in a 
reset loop while we wait for convergence. So to simplify handling, the consumer 
will use the earliest offset on whatever replica it is fetching from.
   * If the reset policy is "earliest," fetch the log start offset of the 
current replica that raised the out of range error.
   * If the reset policy is "latest," fetch the log end offset from the leader.
   
   The KIP seems to have been followed at least by the franz-go implementation 
which is not incorrect. Anywhere I can find the explanation for the current 
algorithm?
   
   Specifically I do wonder what should happen when follower applies retention 
rules ahead of the leader and the log start offsets diverge for a bit.
   
   Thank you 🙇‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to