nvartolomei commented on code in PR #6731:
URL: https://github.com/apache/kafka/pull/6731#discussion_r1912507814
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1136,24 +1163,42 @@ private PartitionRecords
parseCompletedFetch(CompletedFetch completedFetch) {
log.trace("Updating last stable offset for partition {} to
{}", tp, partition.lastStableOffset);
subscriptions.updateLastStableOffset(tp,
partition.lastStableOffset);
}
+
+ if (partition.preferredReadReplica.isPresent()) {
+
subscriptions.updatePreferredReadReplica(partitionRecords.partition,
partition.preferredReadReplica.get(), () -> {
+ long expireTimeMs = time.milliseconds() +
metadata.metadataExpireMs();
+ log.debug("Updating preferred read replica for
partition {} to {}, set to expire at {}",
+ tp, partition.preferredReadReplica.get(),
expireTimeMs);
+ return expireTimeMs;
+ });
+ }
+
} else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
error == Errors.REPLICA_NOT_AVAILABLE ||
error == Errors.KAFKA_STORAGE_ERROR ||
- error == Errors.FENCED_LEADER_EPOCH) {
+ error == Errors.FENCED_LEADER_EPOCH ||
+ error == Errors.OFFSET_NOT_AVAILABLE) {
log.debug("Error in fetch for partition {}: {}", tp,
error.exceptionName());
this.metadata.requestUpdate();
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.warn("Received unknown topic or partition error in fetch
for partition {}", tp);
this.metadata.requestUpdate();
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
- if (fetchOffset != subscriptions.position(tp).offset) {
- log.debug("Discarding stale fetch response for partition
{} since the fetched offset {} " +
- "does not match the current offset {}", tp,
fetchOffset, subscriptions.position(tp));
- } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
- log.info("Fetch offset {} is out of range for partition
{}, resetting offset", fetchOffset, tp);
- subscriptions.requestOffsetReset(tp);
+ Optional<Integer> clearedReplicaId =
subscriptions.clearPreferredReadReplica(tp);
Review Comment:
I know this is late but I'm not sure of a better place where to ask this
question. Please redirect me if not appropriate.
I'm reading through
https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
and this implementation doesn't seem to match the KIP wording.
> The log start offset for the leader is not guaranteed to be present on the
follower we are fetching from and we do not want the consumer to get stuck in a
reset loop while we wait for convergence. So to simplify handling, the consumer
will use the earliest offset on whatever replica it is fetching from.
* If the reset policy is "earliest," fetch the log start offset of the
current replica that raised the out of range error.
* If the reset policy is "latest," fetch the log end offset from the leader.
The KIP seems to have been followed at least by the franz-go implementation
which is not incorrect. Anywhere I can find the explanation for the current
algorithm?
Specifically I do wonder what should happen when follower applies retention
rules ahead of the leader and the log start offsets diverge for a bit.
Thank you 🙇♂️
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]